package com.atguigu.edu.app.dwd.db;

import com.atguigu.edu.util.KafkaUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class DwdTradeOrderPreProcess {
    public static void main(String[] args) throws Exception {
        // TODO 1 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 获取配置对象
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        // 为表关联时状态中存储的数据设置过期时间
        configuration.setString("table.exec.state.ttl", "905 s");


        // TODO 2 设置状态后端
        /*
        env.enableCheckpointing(5 * 60 * 1000L, CheckpointingMode.EXACTLY_ONCE );
        env.getCheckpointConfig().setCheckpointTimeout( 3 * 60 * 1000L );
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop100:8020/edu");
        System.setProperty("HADOOP_USER_NAME", "atguigu");
         */

        // TODO 3. 从 Kafka 读取业务数据，封装为 Flink SQL 表
        tableEnv.executeSql("create table topic_db(" +
                "`database` String,\n" +
                "`table` String,\n" +
                "`type` String,\n" +
                "`data` map<String, String>,\n" +
                "`ts` string\n" +
                ")" + KafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_pre_process"));

        // TODO 4. 读取订单明细表数据
        Table orderDetail = tableEnv.sqlQuery("select \n" +
                "data['id'] id,\n" +
                "data['order_id'] order_id,\n" +
                "data['user_id'] user_id,\n" +
                "data['create_time'] create_time,\n" +
                "data['update_time'] update_time,\n" +
                "data['course_id'] course_id,\n" +
                "data['course_name'] course_name,\n" +
                "data['session_id'] session_id,\n" +
                "data['origin_amount'] origin_amount,\n" +
                "data['coupon_reduce'] coupon_reduce,\n" +
                "data['final_amount'] final_amount,\n" +
                "ts od_ts\n" +
                "from `topic_db` where `table` = 'order_detail' " +
                "and `type` = 'insert'\n");
        tableEnv.createTemporaryView("order_detail", orderDetail);

        // TODO 5. 读取订单表数据
        Table orderInfo = tableEnv.sqlQuery("select \n" +
                "data['id'] id,\n" +
                "data['province_id'] province_id,\n" +
                "data['order_status'] order_status,\n" +
                "`type`,\n" +
                "ts oi_ts\n" +
                "from `topic_db`\n" +
                "where `table` = 'order_info'\n" +
                "and (`type` = 'insert' or `type` = 'update')");
        tableEnv.createTemporaryView("order_info", orderInfo);

        // TODO 6. 读取课程信息表数据
        Table courseInfo = tableEnv.sqlQuery("select \n" +
                "data['id'] id,\n" +
                "data['subject_id'] subject_id\n" +
                "from `topic_db`\n" +
                "where `table` = 'course_info'\n" +
                "and `type` = 'insert'\n");
        tableEnv.createTemporaryView("course_info", courseInfo);


        // TODO 7. 关联三张表获得订单明细表
        Table resultTable = tableEnv.sqlQuery("select \n" +
                "od.id,\n" +
                "od.order_id,\n" +
                "od.user_id,\n" +
                "od.create_time,\n" +
                "od.update_time,\n" +
                "od.course_id,\n" +
                "od.course_name,\n" +
                "od.session_id,\n" +
                "od.origin_amount,\n" +
                "od.coupon_reduce,\n" +
                "od.final_amount,\n" +
                "oi.province_id,\n" +
                "oi.order_status,\n" +
                "oi.`type`,\n" +
                "od.od_ts,\n" +
                "oi.oi_ts,\n" +
                "ci.subject_id\n" +
                "from order_detail od \n" +
                "join order_info oi\n" +
                "on od.order_id = oi.id\n" +
                "left join course_info ci\n" +
                "on od.course_id = ci.id\n" );
        tableEnv.createTemporaryView("result_table", resultTable);

        // TODO 8. 建立 Upsert-Kafka dwd_trade_order_pre_process 表
        tableEnv.executeSql("" +
                "create table dwd_trade_order_pre_process(\n" +
                "id string,\n" +
                "order_id string,\n" +
                "user_id string,\n" +
                "order_status string,\n" +
                "province_id string,\n" +
                "create_time string,\n" +
                "update_time string,\n" +
                "course_id string,\n" +
                "course_name string,\n" +
                "session_id string,\n" +
                "subject_id string,\n" +
                "origin_amount string,\n" +
                "coupon_reduce string,\n" +
                "final_amount string,\n" +
                "`type` string,\n" +
                "od_ts string,\n" +
                "oi_ts string,\n" +
                "primary key(id) not enforced\n" +
                ")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_pre_process"));

        // TODO 9. 将关联结果写入 Upsert-Kafka 表
        tableEnv.executeSql("" +
                "insert into dwd_trade_order_pre_process \n" +
                "select * from result_table");

    }

}
